/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SQL_ENG

#include "sql/engine/connect_by/ob_nested_loop_connect_by_with_index.h"
#include "sql/engine/connect_by/ob_connect_by_utility_bfs.h"
#include "sql/engine/expr/ob_expr_null_safe_equal.h"
#include "sql/engine/sort/ob_base_sort.h"

using namespace oceanbase::sql;
using namespace oceanbase::common;

int ObConnectByPumpBFS::PathNode::init_path_array(const int64_t size)
{
  int ret = common::OB_SUCCESS;
  if (size < 0) {
    ret = common::OB_INVALID_ARGUMENT;
    LOG_WARN("Invalid argument", K(size));
  } else if (0 == size) {
  } else if (OB_FAIL(paths_.prepare_allocate(size))) {
    LOG_WARN("Failed to init array", K(ret), K(size));
  }
  return ret;
}

bool ObConnectByPumpBFS::RowComparer::operator()(const PumpNode& pump_node1, const PumpNode& pump_node2)
{
  bool bret = false;
  const ObNewRow* row1 = pump_node1.output_row_;
  const ObNewRow* row2 = pump_node2.output_row_;
  if (OB_UNLIKELY(OB_SUCCESS != ret_)) {
    // do nothing if we already have an error,
  } else if (OB_ISNULL(row1) || OB_ISNULL(row2) || OB_UNLIKELY(row1->is_invalid()) || OB_UNLIKELY(row2->is_invalid()) ||
             OB_UNLIKELY(row1->count_ != row2->count_)) {
    ret_ = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid parameter", KPC(row1), KPC(row2), K(ret_));
  } else {
    int cmp = 0;
    for (int64_t i = 0; OB_SUCCESS == ret_ && 0 == cmp && i < sort_columns_.count(); ++i) {
      int64_t idx = sort_columns_.at(i).index_;
      if (OB_UNLIKELY(row1->count_ < idx + 1)) {
        ret_ = OB_ERR_UNEXPECTED;
        LOG_WARN("invalid idx", KPC(row1), KPC(row2), K(idx), K(ret_));
      } else {
        cmp = row1->cells_[idx].compare(row2->cells_[idx], sort_columns_.at(i).cs_type_);
        if (cmp < 0) {
          bret = sort_columns_.at(i).is_ascending();
        } else if (cmp > 0) {
          bret = !sort_columns_.at(i).is_ascending();
        }
      }
    }  // end for
  }
  return bret;
}

// called in ObConnectBy rescan, reset to the state after open.
void ObConnectByPumpBFS::reset()
{
  free_memory_for_rescan();
  pump_stack_.reset();
  path_stack_.reset();
  free_record_.reset();
  cur_level_ = 1;
  is_output_level_ = false;
  is_output_cycle_ = false;
  is_output_leaf_ = false;
}

int ObConnectByPumpBFS::sort_sibling_rows()
{
  int ret = OB_SUCCESS;
  PumpNode pump_node;
  // add siblings to pump stack
  if (!need_sort_siblings_) {
  } else if (OB_ISNULL(sort_columns_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected sort columns", K(ret));
  } else if (0 != sort_columns_->count() && false == sort_stack_.empty()) {
    RowComparer compare(*sort_columns_, ret);
    PumpNode* first_node = &sort_stack_.at(0);
    std::sort(first_node, first_node + sort_stack_.count(), compare);
  }
  if (pump_stack_.count() + sort_stack_.count() >= CONNECT_BY_MAX_NODE_NUM) {
    ret = OB_ERR_CBY_NO_MEMORY;
    int64_t max_node_num = CONNECT_BY_MAX_NODE_NUM;
    LOG_WARN("connect by reach memory limit", K(ret), K(pump_stack_.count()), K(sort_stack_.count()), K(max_node_num));
  }
  while (OB_SUCC(ret) && false == sort_stack_.empty()) {
    pump_node.reset();
    if (OB_FAIL(sort_stack_.pop_back(pump_node))) {
      LOG_WARN("fail to pop back stack", K(ret));
    } else if (OB_FAIL(pump_stack_.push_back(pump_node))) {
      LOG_WARN("fail to pop back stack", K(ret));
    }
  }
  return ret;
}

int ObConnectByPumpBFS::add_root_row(const ObNewRow* root_row, const ObNewRow* output_row)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(false == pump_stack_.empty()) || OB_UNLIKELY(false == path_stack_.empty())) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("pump stack must be empty", K(pump_stack_), K(path_stack_), K(ret));
  } else if (OB_ISNULL(root_row) || OB_ISNULL(output_row)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid parameter", K(root_row), K(output_row), K(ret));
  } else if (OB_FAIL(push_back_row_to_stack(*root_row, *output_row))) {
    LOG_WARN("fail to push back row", K(ret));
  } else {
    LOG_DEBUG("connect by add row", K(*root_row), K(*output_row), K(lbt()));
  }
  return ret;
}

void ObConnectByPumpBFS::free_memory_for_rescan()
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(free_path_stack())) {
    LOG_ERROR("fail to free path stack", K(ret));
  }

  if (OB_FAIL(free_pump_node_stack(pump_stack_))) {
    LOG_ERROR("fail to free pump stack", K(ret));
  }

  if (OB_FAIL(free_pump_node_stack(sort_stack_))) {
    LOG_ERROR("fail to free sort stack", K(ret));
  }

  if (OB_FAIL(free_record_rows())) {
    LOG_ERROR("fail to free record rows", K(ret));
  }

  if (connect_by_root_row_ != NULL) {
    allocator_.free(const_cast<ObNewRow*>(connect_by_root_row_));
    connect_by_root_row_ = NULL;
  }
}
void ObConnectByPumpBFS::free_memory()
{
  free_memory_for_rescan();

  if (shallow_row_.cells_ != NULL) {
    allocator_.free(shallow_row_.cells_);
    shallow_row_.cells_ = NULL;
  }

  if (shallow_row_.projector_ != NULL) {
    allocator_.free(shallow_row_.projector_);
    shallow_row_.projector_ = NULL;
  }
}

int ObConnectByPumpBFS::free_path_stack()
{
  int ret = OB_SUCCESS;
  PathNode pop_node;
  while (OB_SUCC(ret) && false == path_stack_.empty()) {
    if (OB_FAIL(path_stack_.pop_back(pop_node))) {
      LOG_WARN("fail to pop back", K(ret));
    } else if (OB_ISNULL(pop_node.prior_exprs_result_)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("invalid pop node", K(ret));
    } else {
      allocator_.free(const_cast<ObNewRow*>(pop_node.prior_exprs_result_));
      pop_node.prior_exprs_result_ = NULL;
      for (int64_t i = 0; OB_SUCC(ret) && i < connect_by_path_count_; ++i) {
        if (pop_node.paths_.at(i).ptr() != NULL) {
          allocator_.free(pop_node.paths_.at(i).ptr());
          pop_node.paths_.at(i).make_empty_string();
        }
      }
    }
  }
  return ret;
}

int ObConnectByPumpBFS::free_pump_node_stack(ObIArray<PumpNode>& stack)
{
  int ret = OB_SUCCESS;
  PumpNode pop_node;
  while (OB_SUCC(ret) && false == stack.empty()) {
    if (OB_FAIL(stack.pop_back(pop_node))) {
      LOG_WARN("fail to pop back pump_stack", K(ret));
    } else if (OB_ISNULL(pop_node.pump_row_) || OB_ISNULL(pop_node.output_row_) ||
               OB_ISNULL(pop_node.path_node_.prior_exprs_result_)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("invalid pop node", K(pop_node), K(ret));
    } else {
      allocator_.free(const_cast<ObNewRow*>(pop_node.pump_row_));
      allocator_.free(const_cast<ObNewRow*>(pop_node.output_row_));
      allocator_.free(const_cast<ObNewRow*>(pop_node.path_node_.prior_exprs_result_));
      pop_node.pump_row_ = NULL;
      pop_node.output_row_ = NULL;
      pop_node.path_node_.prior_exprs_result_ = NULL;
      for (int64_t i = 0; OB_SUCC(ret) && i < connect_by_path_count_; ++i) {
        if (pop_node.path_node_.paths_[i].ptr() != NULL) {
          LOG_ERROR("unexpected path value");
          allocator_.free(pop_node.path_node_.paths_.at(i).ptr());
          pop_node.path_node_.paths_.at(i).make_empty_string();
        }
      }
    }
  }
  return ret;
}

int ObConnectByPumpBFS::push_back_row_to_stack(const ObNewRow& left_row, const ObNewRow& output_row)
{
  int ret = OB_SUCCESS;
  const ObNewRow* new_left_row = NULL;
  const ObNewRow* new_output_row = NULL;
  PumpNode pump_node;
  if (OB_FAIL(calc_path_node(left_row, pump_node))) {
    LOG_WARN("fail to calc path node", K(pump_node), K(ret));
  } else if (OB_FAIL(deep_copy_row(left_row, new_left_row))) {
    LOG_WARN("fail to deep copy", K(ret));
  } else if (OB_FAIL(deep_copy_row(output_row, new_output_row))) {
    LOG_WARN("fail to deep copy row", K(ret));
  } else if (OB_ISNULL(pump_node.pump_row_ = new_left_row) || OB_ISNULL(pump_node.output_row_ = new_output_row)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("copy row is NULL", KPC(new_output_row), KPC(new_left_row), K(ret));
  } else if (OB_FAIL(sort_stack_.push_back(pump_node))) {
    LOG_WARN("fail to push back row", K(ret));
  } else {
    LOG_DEBUG("connect by pump node", K(pump_node), K(left_row), K(*new_left_row), K(output_row), K(*new_left_row));
  }

  if (OB_FAIL(ret)) {  // if fail free memory
    if (pump_node.path_node_.prior_exprs_result_ != NULL) {
      allocator_.free(const_cast<ObNewRow*>(pump_node.path_node_.prior_exprs_result_));
      pump_node.path_node_.prior_exprs_result_ = NULL;
    }
    if (new_left_row != NULL) {
      allocator_.free(const_cast<ObNewRow*>(new_left_row));
      new_left_row = NULL;
      pump_node.pump_row_ = NULL;
    }
    if (new_output_row != NULL) {
      allocator_.free(const_cast<ObNewRow*>(new_output_row));
      new_output_row = NULL;
      pump_node.output_row_ = NULL;
    }
  }
  return ret;
}

int ObConnectByPumpBFS::append_row(const ObNewRow* right_row, const ObNewRow* output_row)
{
  int ret = OB_SUCCESS;
  if (OB_ISNULL(right_row) || OB_ISNULL(output_row)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid argument", K(right_row), KPC(output_row));
  } else if (OB_FAIL(construct_pump_row(right_row))) {
    LOG_WARN("fail to construct pump row", K(ret));
  } else if (OB_FAIL(push_back_row_to_stack(shallow_row_, *output_row))) {
    LOG_WARN("fail to push back row to stack", K(ret));
  }
  return ret;
}

int ObConnectByPumpBFS::init(const ObConnectByWithIndex& connect_by, common::ObExprCtx* expr_ctx)
{
  int ret = OB_SUCCESS;
  const ObPhyOperator* left_op = NULL;
  if (OB_UNLIKELY(is_inited_)) {
    ret = OB_INIT_TWICE;
    LOG_WARN("init twice", K(ret));
  } else if (OB_ISNULL(expr_ctx) || OB_ISNULL(expr_ctx->my_session_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("expr_ctx or session is null", K(ret));
  } else if (OB_UNLIKELY(2 != connect_by.get_child_num())) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid child num", K(ret));
  } else if (OB_ISNULL(left_op = connect_by.get_child(0))) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("left operator is NULL", K(ret));
  } else if (OB_FAIL(alloc_shallow_row_cells(*left_op))) {
    LOG_WARN("fail to alloc shallow row cells", K(ret));
  } else if (OB_FAIL(alloc_shallow_row_projector(*left_op))) {
    LOG_WARN("fail to alloc shallow row projector", K(ret));
  } else {
    uint64_t tenant_id = expr_ctx->my_session_->get_effective_tenant_id();
    allocator_.set_tenant_id(tenant_id);
    pump_row_desc_ = connect_by.get_pump_row_desc();
    pseudo_column_row_desc_ = connect_by.get_pseudo_column_row_desc();
    connect_by_prior_exprs_ = connect_by.get_connect_by_prior_exprs();
    sort_columns_ = connect_by.get_sort_siblings_columns();
    expr_ctx_ = expr_ctx;
    need_sort_siblings_ = connect_by.get_need_sort_siblings();
    connect_by_path_count_ = connect_by.get_sys_connect_by_path_expression_count();
    if (OB_FAIL(check_output_pseudo_columns())) {
      LOG_WARN("fail to check output pseudo columns", K(ret));
    } else if (OB_FAIL(check_pump_row_desc())) {
      LOG_WARN("fail to check pump row desc", K(ret));
    } else {
      is_inited_ = true;
    }
  }
  return ret;
}

int ObConnectByPumpBFS::free_record_rows()
{
  // free last pump row and output row
  int ret = OB_SUCCESS;
  int64_t free_record_count = free_record_.count();
  if (OB_UNLIKELY(free_record_count != 2) && OB_UNLIKELY(free_record_count != 0)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected free_record count", K(free_record_count));
  } else if (free_record_count != 0) {
    for (int i = 0; OB_SUCC(ret) && i < free_record_count; ++i) {
      const ObNewRow* free_row = NULL;
      if (OB_FAIL(free_record_.pop_back(free_row))) {
        LOG_WARN("fail to pop back value", K(i), K(ret));
      } else {
        allocator_.free(const_cast<ObNewRow*>(free_row));
        free_row = NULL;
      }
    }
  }
  return ret;
}

int ObConnectByPumpBFS::add_path_stack(PathNode& path_node)
{
  int ret = OB_SUCCESS;
  bool has_added = false;
  PathNode pop_node;
  if (OB_FAIL(pop_node.init_path_array(connect_by_path_count_))) {
    LOG_WARN("Failed to init path array", K(ret));
  } else if (0 == path_stack_.count()) {
    if (OB_FAIL(path_stack_.push_back(path_node))) {
      LOG_WARN("fail to add path node", K(path_node), K(ret));
    } else {
      LOG_DEBUG("Push back path node", K(path_node));
      has_added = true;
    }
  } else {
    while (OB_SUCC(ret) && false == has_added && false == path_stack_.empty()) {
      PathNode& cur_node = path_stack_.at(path_stack_.count() - 1);
      if (cur_node.level_ == path_node.level_ - 1) {
        if (OB_FAIL(path_stack_.push_back(path_node))) {
          LOG_WARN("fail to add path node", K(path_node), K(ret));
        } else {
          has_added = true;
          LOG_DEBUG("Push back path node", K(path_node));
        }
      } else if (cur_node.level_ > path_node.level_ - 1) {
        if (OB_FAIL(path_stack_.pop_back(pop_node))) {
          LOG_WARN("fail to pop back", K(ret));
        } else if (OB_ISNULL(pop_node.prior_exprs_result_)) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("invalid pop node", K(ret));
        } else {
          LOG_DEBUG("Pop path node", K(path_node));
          allocator_.free(const_cast<ObNewRow*>(pop_node.prior_exprs_result_));
          pop_node.prior_exprs_result_ = NULL;
          for (int64_t i = 0; OB_SUCC(ret) && i < connect_by_path_count_; ++i) {
            if (pop_node.paths_[i].ptr() != NULL) {
              allocator_.free(pop_node.paths_.at(i).ptr());
              pop_node.paths_.at(i).make_empty_string();
            }
          }
          if (0 == path_stack_.count()) {  // current path_node is root node
            if (OB_FAIL(path_stack_.push_back(path_node))) {
              LOG_WARN("fail to push back path_node", K(path_node), K(ret));
            } else {
              LOG_DEBUG("Push back path node", K(path_node));
              has_added = true;
            }
          }
        }
      } else {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("invalid cur_node level", K(cur_node));
      }
    }
  }
  if (false == has_added) {  // free memory
    allocator_.free(const_cast<ObNewRow*>(path_node.prior_exprs_result_));
    path_node.prior_exprs_result_ = NULL;
  }

  LOG_DEBUG("connect by get path node", K(path_node));

  return ret;
}

int ObConnectByPumpBFS::set_connect_by_root_row(const ObNewRow* cur_row)
{
  int ret = OB_SUCCESS;
  if (OB_ISNULL(cur_row) || OB_UNLIKELY(path_stack_.count() <= 0)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid path stack", K(path_stack_), K(ret));
  } else {
    const PathNode& cur_node = path_stack_.at(path_stack_.count() - 1);
    if (OB_UNLIKELY(1 == cur_node.level_)) {
      if (connect_by_root_row_ != NULL) {
        allocator_.free(const_cast<ObNewRow*>(connect_by_root_row_));
        connect_by_root_row_ = NULL;
      }
      if (OB_FAIL(deep_copy_row(*cur_row, connect_by_root_row_))) {
        LOG_WARN("fail to deep copy row", K(cur_row), K(ret));
      }
    }
  }
  return ret;
}

int ObConnectByPumpBFS::get_next_row(const ObNewRow*& pump_row, const ObNewRow*& output_row)
{
  int ret = OB_SUCCESS;
  PumpNode pump_node;
  if (OB_FAIL(free_record_rows())) {
    LOG_WARN("fail to free record rows", K(ret));
  } else if (OB_FAIL(pump_stack_.pop_back(pump_node))) {
    if (ret != OB_ENTRY_NOT_EXIST) {
      LOG_WARN("fail to pop back value", K(ret));
    }
  } else if (OB_FAIL(free_record_.push_back(pump_node.pump_row_))) {
    LOG_WARN("fail to push back value", K(ret));
  } else if (OB_FAIL(free_record_.push_back(pump_node.output_row_))) {
    LOG_WARN("fail to push back value", K(ret));
  } else if (OB_FAIL(add_path_stack(pump_node.path_node_))) {
    LOG_WARN("fail to add path stack", K(pump_node), K(ret));
  } else {
    pump_row = pump_node.pump_row_;
    output_row = pump_node.output_row_;
    cur_level_ = pump_node.path_node_.level_ + 1;
    LOG_DEBUG("connect by get output pump node", K(pump_node));
  }

  if (OB_ENTRY_NOT_EXIST == ret) {
    ret = OB_ITER_END;
  }

  return ret;
}

int ObConnectByPumpBFS::add_path_node(const ObNewRow& row, PumpNode& pump_node)
{
  int ret = OB_SUCCESS;
  PathNode node;
  if (OB_FAIL(node.init_path_array(connect_by_path_count_))) {
    LOG_WARN("Failed to init path array", K(ret));
  } else if (OB_FAIL(check_cycle_path(row))) {
    LOG_WARN("fail to check cycle path", K(node), K(ret));
  } else if (OB_FAIL(deep_copy_row(row, node.prior_exprs_result_))) {
    LOG_WARN("fail to deep copy row", K(row), K(ret));
  } else {
    node.level_ = cur_level_;
    pump_node.path_node_ = node;
  }
  return ret;
}

int ObConnectByPumpBFS::calc_path_node(const ObNewRow& left_row, PumpNode& pump_node)
{
  int ret = OB_SUCCESS;
  ObNewRow row;
  if (OB_ISNULL(connect_by_prior_exprs_) || OB_ISNULL(expr_ctx_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid data member", KPC(connect_by_prior_exprs_), K(ret));
  } else if (OB_UNLIKELY(connect_by_prior_exprs_->get_size() == 0)) {
    if (OB_FAIL(add_path_node(left_row, pump_node))) {
      LOG_WARN("fail to add path node", K(left_row), K(ret));
    }
  } else if (connect_by_prior_exprs_->get_size() > 0) {

    // allocate prior_expr_result_row
    int result_cnt = connect_by_prior_exprs_->get_size();
    void* ptr = alloca(result_cnt * sizeof(ObObj));
    if (OB_ISNULL(ptr)) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_WARN("fail to alloc memory on stack", K(result_cnt * sizeof(ObObj)));
    } else {
      row.cells_ = new (ptr) ObObj[result_cnt];
      row.count_ = result_cnt;
    }

    if (OB_SUCC(ret)) {
      // calc prior expr result
      int idx = 0;
      DLIST_FOREACH(expr, *connect_by_prior_exprs_)
      {
        if (OB_FAIL(expr->calc(*expr_ctx_, left_row, row.cells_[idx]))) {
          LOG_WARN("fail to calc prior expr", KPC(expr));
        } else {
          ++idx;
        }
      }
    }

    LOG_DEBUG("connect by after prior exprs", K(left_row), K(row), K(cur_level_));
    // check cycle and add node
    PathNode node;
    if (OB_SUCC(ret)) {
      if (OB_FAIL(node.init_path_array(connect_by_path_count_))) {
        LOG_WARN("Failed to init path array", K(ret));
      } else if (OB_FAIL(add_path_node(row, pump_node))) {
        LOG_WARN("fail to add path node", K(row));
      }
    }
  }
  return ret;
}

int ObConnectByPumpBFS::check_cycle_path(const ObNewRow& row)
{
  int ret = OB_SUCCESS;
  ObObj cmp_result;
  bool always_false = false;
  if (OB_ISNULL(expr_ctx_) || OB_ISNULL(pump_row_desc_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid data member", K(ret));
  } else {
    /*
     * What is a pump row ?
     * We transform right row to a new left row, and we call this new
     * left row a pump row.
     * Why is the pump_row_desc_ empty ?
     * If the connect by join doesn't include any join condition,
     * say, select 1 from dual connect by level < 5, the pump_row_desc is empty.
     * We can say pump_row_desc_ is only for cycle detect.
     * A empty pump_row_desc_ means we never get a cycle in this connect by join.
     *
     */
    always_false = never_meet_cycle_;
  }

  for (int64_t i = 0; OB_SUCC(ret) && i < path_stack_.count() && !always_false; ++i) {
    const PathNode& cur_node = path_stack_.at(i);
    // prior exprs in connect_by conditions are all const exprs, such as connect by prior 0 = 0,
    // so there is a loop when level = 2.
    if (connect_by_prior_exprs_->get_size() == 0) {
      ret = OB_ERR_CBY_LOOP;
      LOG_WARN("CONNECT BY loop in user data", K(ret), K(cur_node), K(row));
    } else if (OB_FAIL(ObExprNullSafeEqual::compare_row2(cmp_result, &row, cur_node.prior_exprs_result_, *expr_ctx_))) {
      LOG_WARN("fail to compare prior_exprs_result", K(row), K(cur_node), K(ret));
    } else if (cmp_result.is_true()) {
      ret = OB_ERR_CBY_LOOP;
      LOG_WARN("CONNECT BY loop in user data", K(row), K(cur_node), K(ret));
    }
  }
  return ret;
}

int ObConnectByPumpBFS::get_parent_path(int64_t idx, ObString& p_path) const
{
  int ret = OB_SUCCESS;
  int64_t node_cnt = path_stack_.count();
  if (OB_UNLIKELY(node_cnt <= 0)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid path stack", K(node_cnt), K(ret));
  } else if (1 == node_cnt) {
    p_path.make_empty_string();
  } else if (idx >= path_stack_.at(node_cnt - 1).paths_.count()) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid path stack", K(idx), K(path_stack_.at(node_cnt - 1).paths_.count()), K(ret));
  } else {
    p_path = path_stack_.at(node_cnt - 2).paths_.at(idx);
  }
  return ret;
}

int ObConnectByPumpBFS::set_cur_node_path(int64_t idx, const ObString& path)
{
  int ret = OB_SUCCESS;
  int64_t node_cnt = path_stack_.count();
  if (OB_UNLIKELY(node_cnt <= 0) || idx < 0) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid path stack", K(node_cnt), K(ret));
  } else if (idx >= path_stack_.at(node_cnt - 1).paths_.count()) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invalid path stack", K(idx), K(path_stack_.at(node_cnt - 1).paths_.count()), K(ret));
  } else {
    PathNode& cur_node = path_stack_.at(node_cnt - 1);
    char* old_buf = cur_node.paths_.at(idx).ptr();
    if (OB_FAIL(ob_write_string(allocator_, path, cur_node.paths_.at(idx)))) {
      LOG_WARN("fail to copy string", K(path), K(ret));
    }
    if (NULL != old_buf) {
      allocator_.free(old_buf);
    }
  }
  return ret;
}
